🕺Kapoeira💃

kapoeira

Presentation

kara
Mehdi Rebiai

Presentation

odile
Johanna Vauchel

Take Away 🎁

  • Discover a new tool to test your kafka streams

  • Help you in your communication with PO/QA/DEV

  • Tips to use it every day

  • Have a good time (we hope)

📽️ Kapoeira story 🎬

cine

Enrich and collect data

enrichData

We are perfect !

perfect

Data is perfect !

pipeline example

NO !

no not

NO !

pipeline example poo

Solution ?

TESTS OUR STREAMS!

How to test ?

fast

Fast and efficient…​

Scala Test Example

package com.lectra.kafka.stream.example

import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.streams._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, GivenWhenThen}

import java.io.File
import java.util.UUID

class KafkaStreamSelectKeyTest extends AnyFlatSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with GivenWhenThen {

  private val stringSerializer = new StringSerializer()
  private val stringDeserializer = new StringDeserializer()

  private var driver: TopologyTestDriver = _
  private var inputTopic: TestInputTopic[String, String] = _
  private var outputTopic: TestOutputTopic[String, String] = _

  private def tempDir: File = {
    val ioDir = System.getProperty("java.io.tmpdir")
    val f = new File(ioDir, "kafka-" + UUID.randomUUID().toString)
    f.mkdirs()
    f.deleteOnExit()
    f
  }

  private def buildTopology(): Topology = {
    import org.apache.kafka.streams.scala.StreamsBuilder
    val builder = new StreamsBuilder
    KafkaStreamSelectKey.topology(builder)
    builder.build()
  }

  override def beforeEach(): Unit = {
    KafkaStreamAvro.config.put(StreamsConfig.STATE_DIR_CONFIG, tempDir.getAbsolutePath)
    driver = new TopologyTestDriver(buildTopology(), KafkaStreamSelectKey.config)
    inputTopic = driver.createInputTopic(KafkaStreamSelectKey.topicIn, stringSerializer, stringSerializer)
    outputTopic = driver.createOutputTopic(KafkaStreamSelectKey.topicChangedKey, stringDeserializer, stringDeserializer)
  }

  override def afterEach(): Unit = {
    driver.close()
  }


  "Nominal case for select" should "change the key of records by combining key and value with -" in {
    val key = "mykey"
    val value = "myvalue"
    val key2 = "yourkey"
    val value2 = "yourvalue"

    inputTopic.pipeInput(key, value)
    inputTopic.pipeInput(key2, value2)
    val expectedKey1 = s"$key-$value"
    val expectedKey2 = s"$key2-$value2"

    outputTopic.getQueueSize shouldBe 2
    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey1, value)
    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey2, value2)

  }


}

Happy 😀…​🤮

content vomi

…​ But it’s a mocked infrastructure

fail

It did not test the integration with the Kafka cluster

…​ And only unit tests

end to end

How to test several streams ?

…​ And it’s not for QA (=👮)

les nuls police

…​ And it’s technical code

simon kara

How to communicate with DEV/PO/QA ?

What do we need ?

test pyramid cesar triangle

Integration tests with a simple syntax

Inspiration : Karate

karate

Our context != HTTP

kafkalogo

Integration with Kafka Streams !

What is Kapoeira?

Cucumber Scala, using specific Gherkin DSL.

cucumber

What is Gherkin ?

Feature: A calculator example

  Scenario: The sum example
    Given a = 2
    And b = 3
    When a + b
    Then result == 5

  Scenario: The mult example
    Given a = 2
    And b = 3
    When a * b
    Then result == 6

What is Cucumber ?

class StepDefinitions {
  var variables = collection.mutable.Map[String,Long]()
  var result: Long = 0

  @Given("^\\s*([a-zA-Z]+)\\s*=\\s*(\\d+)\\s*$")
  def addVariable(name: String, value: Long) : Unit = {
    variables = variables ++ (name , value)
  }

  @When("^\\s*([a-zA-Z]+)\\s*\\+\\s*([a-zA-Z]+)\\s*$")
  def sum(left: String, right: String): Unit = {
    val leftValue = variables.get(left)
    val rightValue = variables.get(right)
    assertNotNull(leftValue, "Unknown variable " + left)
    assertNotNull(rightValue, "Unknown variable " + right)
    result = leftValue + rightValue
  }

  @When("^\\s*([a-zA-Z]+)\\s*\\*\\s*([a-zA-Z]+)\\s*$")
  def mult(left: String, right: String): Unit = {
    val leftValue = variables.get(left)
    val rightValue = variables.get(right)
    assertNotNull(leftValue, "Unknown variable " + left)
    assertNotNull(rightValue, "Unknown variable " + right)
    result = leftValue * rightValue
  }

  @Then("^\\s*result\\s*==\\s*(\\d+)\\s*$")
  def result(expectedResult: Long) = {
    assertEquals(expectedResult, result)
  }
}

How does it work ?

archi

How does it work ?

kapoeira diagram

2020 - Birth of Kapoeira

young chabat
  • Inner Source @Lectra

  • First syntax created with a QA

  • Backend calling Confluent CLI

CLI…​

# Console producer
kafka-console-producer \
  --topic orders \
  --bootstrap-server broker:9092 \

# Console consumer
kafka-console-consumer \
  --topic orders \
  --bootstrap-server broker:9092 \
  --from-beginning

2020 - Custom backend

  • Specific Scala implementation for Kafka Consumer/Producer

  • Better syntax with Gherkin Datatable

2021 - ZIO

zio

To improve perfs and add parallel mode ?

2023 - Open Source

2024 - New features

Thanks to you !

Demo

buger quiz
rapport

Fries.feature

Feature: Fries 🍟 feature

  Background:
    Given input topic
      | topic  | alias     | key_type | value_type |
      | potato | potato-in | string   | string     |
    And output topic
      | topic       | alias           | key_type | value_type | readTimeoutInSecond |
      | side-dishes | side-dishes-out | string   | string     | 20                  |
    And var uuid = call function: uuid

  Scenario: Transformation
    When records with key and value are sent
      | topic_alias | key        | value |
      | potato-in   | 🤤_${uuid} | 🥔    |
    Then expected records
      | topic_alias     | key        | value  |
      | side-dishes-out | 🤤_${uuid} | result |
    And assert result $ == "🍟"

An architecture

burger quiz

Docker commands

docker compose build --no-cache
docker compose up -d
docker restart kapoeira

burger.feature

Feature: Burger 🍔 feature

  Background:
    Given input topic
      | topic     | alias        | key_type | value_type |
      | bread     | bread-in     | string   | string     |
      | vegetable | vegetable-in | string   | string     |
      | meat      | meat-in      | string   | string     |
    And output topic
      | topic  | alias      | key_type | value_type | readTimeoutInSecond |
      | burger | burger-out | string   | string     | 5                   |
    And var uuid = call function: uuid

  Scenario: Nominal
    When records with key and value are sent
      | topic_alias  | key        | value |
      | bread-in     | 🤤_${uuid} | 🍞    |
      | vegetable-in | 🤤_${uuid} | 🍅    |
      | meat-in      | 🤤_${uuid} | 🥩    |
    Then expected records
      | topic_alias | key        | value |
      | burger-out  | 🤤_${uuid} | order |
    And assert order $ == "🍔"

  Scenario: Not a burger
    When records with key and value are sent
      | topic_alias  | key        | value |
      | bread-in     | 🤤_${uuid} | 🍞    |
      | vegetable-in | 🤤_${uuid} | 🥕    |
      | meat-in      | 🤤_${uuid} | 🥩    |
    Then expected records
      | topic_alias | key        | value |
      | burger-out  | 🤤_${uuid} | order |
    And assert order $ == "🍞 + 🥕 + 🥩"

  Scenario Outline: Many customers
    When records with key and value are sent
      | topic_alias  | key            | value       |
      | bread-in     | <user>_${uuid} | <bread>     |
      | vegetable-in | <user>_${uuid} | <vegetable> |
      | meat-in      | <user>_${uuid} | <meat>      |
    Then expected records
      | topic_alias | key            | value |
      | burger-out  | <user>_${uuid} | order |
    And assert order $ == "<result>"

    Examples:
      | user | bread | vegetable | meat | result |
      | 🤤   | 🍞    | 🍅        | 🥩   | 🍔     |
      | 😋   | 🍞    | 🍅        | 🍗   | 🍔     |
      | 😡   | 🍞    | 🍅        | 🐟   | 🍔     |

meal.feature

Feature: Meal 🛍 feature

  Background:
    Given input topic
      | topic       | alias          | key_type | value_type |
      | bread       | bread-in       | string   | string     |
      | vegetable   | vegetable-in   | string   | string     |
      | meat        | meat-in        | string   | string     |
      | side-dishes | side-dishes-in | string   | string     |
    And output topic
      | topic | alias    | key_type | value_type | readTimeoutInSecond |
      | meal  | meal-out | string   | string     | 20                  |
    And var uuid = call function: uuid

  Scenario: Left Join with Left first
    When records with key and value are sent
      | topic_alias    | key        | value |
      | bread-in       | 🤤_${uuid} | 🍞    |
      | vegetable-in   | 🤤_${uuid} | 🍅    |
      | meat-in        | 🤤_${uuid} | 🥩    |
      | side-dishes-in | 🤤_${uuid} | 🥔🍺  |
    Then expected records
      | topic_alias | key        | value |
      | meal-out    | 🤤_${uuid} | notif |
      | meal-out    | 🤤_${uuid} | order |
    And assert notif $ == "🍔"
    And assert order $ == "🛍(🍔 + 🍟🍺)"

  Scenario: Left Join with Right first
    When records with key and value are sent
      | topic_alias    | key        | value |
      | side-dishes-in | 🤤_${uuid} | 🥔🍷  |
      | bread-in       | 🤤_${uuid} | 🍞    |
      | vegetable-in   | 🤤_${uuid} | 🍅    |
      | meat-in        | 🤤_${uuid} | 🥩    |
    Then expected records
      | topic_alias | key        | value |
      | meal-out    | 🤤_${uuid} | order |
    And assert order $ == "🛍(🍔 + 🍟🍷)"

Advanced example

Feature: upper-case
  Background:
    Given input topic
      | topic              | alias    | key_type | value_type |
      | topic-simple-value | topic_in | string   | string     | (1)

    And output topic
      | topic                   | alias            | key_type | value_type | readTimeoutInSecond |
      | topic-simple-value      | topic_string_out | string   | string     | 5                   |
      | topic-upper-case-string | topic_out        | string   | string     | 5                   |  (2)
    And var myKey = call function: uuid

  Scenario: My first scenario
    When records with key and value are sent (3)
      | topic_alias | key      | value |
      | topic_in    | ${myKey} | a     |
      | topic_in    | ${myKey} | b     |
      | topic_in    | ${myKey} | c     |
    Then expected records                    (4)
      | topic_alias      | key      | value    |
      | topic_string_out | ${myKey} | input_1  |
      | topic_string_out | ${myKey} | input_2  |
      | topic_string_out | ${myKey} | input_3  |
      | topic_out        | ${myKey} | result_1 |
      | topic_out        | ${myKey} | result_2 |
      | topic_out        | ${myKey} | result_3 |
    And assert input_1 $ == "a"             (5)
    And assert input_2 $ == "b"
    And assert input_3 $ == "c"

    And assert result_1 $ == "A"
    And assert result_2 $ == "B"
    And assert result_3 $ == "C"

Report

report

REX

  • 👐 Big community in Lectra

  • ✏️ Easy for QA to enrich existing tests

  • 🤝 Used as acceptance tests, specifications during story grooming

  • 🔄 Used as end-to-end tests

Advantages

advantages

  • Kafka infra

  • Simple to use

  • Communicate with PO/QA/DEV

  • tests as documentation

  • tests as acceptance for stories

Want to use it ?

banco

How to build ?

docker build -t kapoeira:latest .

How to use ?

docker run --rm -ti \
-v <PATH_TO_YOUR_FEATURES_FOLDER>:/features \
-v /var/run/docker.sock:/var/run/docker.sock \
-e KAFKA_BOOTSTRAP_SERVER=<HOST:PORT[,HOST2:PORT2,HOST3:PORT3,...]> \
-e KAFKA_SCHEMA_REGISTRY_URL=<URL> \
-e KAFKA_USER=<XXX> \
-e KAFKA_PASSWORD=<****> \
-e JAAS_AUTHENT=<true (default) | false> \
-e LOGGING_LEVEL=<INFO (default) | ERROR | ...> \
-e THREADS=<8 (default) | ... > \
lectratech/kapoeira

How to contribute ?

TODO

Thank you !

Thanks for your feedback